/*lint -e662 */
/*lint -e661 */

/*
 * Copyright (c) 2016 MariaDB Corporation Ab
 *
 * Use of this software is governed by the Business Source License included
 * in the LICENSE.TXT file and at www.mariadb.com/bsl.
 *
 * Change Date: 2019-01-01
 *
 * On the date above, in accordance with the Business Source License, use
 * of this software will be governed by version 2 or later of the General
 * Public License.
 */

/**
 * @file blr_master.c - contains code for the router to master communication
 *
 * The binlog router is designed to be used in replication environments to
 * increase the replication fanout of a master server. It provides a transparant
 * mechanism to read the binlog entries for multiple slaves while requiring
 * only a single connection to the actual master to support the slaves.
 *
 * The current prototype implement is designed to support MySQL 5.6 and has
 * a number of limitations. This prototype is merely a proof of concept and
 * should not be considered production ready.
 *
 * @verbatim
 * Revision History
 *
 * Date         Who                 Description
 * 02/04/2014   Mark Riddoch        Initial implementation
 * 07/05/2015   Massimiliano Pinto  Added MariaDB 10 Compatibility
 * 25/05/2015   Massimiliano Pinto  Added BLRM_SLAVE_STOPPED state
 * 08/06/2015   Massimiliano Pinto  Added m_errno and m_errmsg
 * 23/06/2015   Massimiliano Pinto  Master communication goes into BLRM_SLAVE_STOPPED state
 *                                  when an error is encountered in BLRM_BINLOGDUMP state.
 *                                  Server error code and msg are reported via SHOW SLAVE STATUS
 * 03/08/2015   Massimiliano Pinto  Initial implementation of transaction safety
 * 13/08/2015   Massimiliano Pinto  Addition of heartbeat check
 * 23/08/2015   Massimiliano Pinto  Added strerror_r
 * 26/08/2015   Massimiliano Pinto  Added MariaDB 10 GTID event check with flags = 0
 *                                  This is the current supported condition for detecting
 *                                  MariaDB 10 transaction start point.
 *                                  It's no longer using QUERY_EVENT with BEGIN
 * 25/09/2015   Massimiliano Pinto  Addition of lastEventReceived for slaves
 * 23/10/2015   Markus Makela       Added current_safe_event
 * 26/04/2016   Massimiliano Pinto  Added MariaDB 10.0 and 10.1 GTID event flags detection
 *
 * @endverbatim
 */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <service.h>
#include <server.h>
#include <router.h>
#include <atomic.h>
#include <session.h>
#include <blr.h>
#include <dcb.h>
#include <spinlock.h>
#include <housekeeper.h>
#include <buffer.h>

#include <sys/types.h>
#include <sys/socket.h>

#include <skygw_types.h>
#include <skygw_utils.h>
#include <log_manager.h>

#include <rdtsc.h>
#include <thread.h>

/* Temporary requirement for auth data */
#include <mysql_client_server_protocol.h>


static GWBUF *blr_make_query(char *statement);
static GWBUF *blr_make_registration(ROUTER_INSTANCE *router);
static GWBUF *blr_make_binlog_dump(ROUTER_INSTANCE *router);
void encode_value(unsigned char *data, unsigned int value, int len);
void blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt);
static int  blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *pkt, REP_HEADER *hdr);
void blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr,
                                  blr_thread_role_t role);
static void *CreateMySQLAuthData(char *username, char *password, char *database);
void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr);
static void blr_log_packet(int priority, char *msg, uint8_t *ptr, int len);
void blr_master_close(ROUTER_INSTANCE *);
char *blr_extract_column(GWBUF *buf, int col);
void poll_fake_write_event(DCB *dcb);
GWBUF *blr_read_events_from_pos(ROUTER_INSTANCE *router, unsigned long long pos, REP_HEADER *hdr,
                                unsigned long long pos_end);
static void blr_check_last_master_event(void *inst);
extern int blr_check_heartbeat(ROUTER_INSTANCE *router);
static void blr_log_identity(ROUTER_INSTANCE *router);
static void blr_distribute_error_message(ROUTER_INSTANCE *router, char *message, char *state,
                                         unsigned int err_code);

int blr_write_data_into_binlog(ROUTER_INSTANCE *router, uint32_t data_len, uint8_t *buf);
void extract_checksum(ROUTER_INSTANCE* router, uint8_t *cksumptr, uint8_t len);
static void blr_terminate_master_replication(ROUTER_INSTANCE *router, uint8_t* ptr, int len);

static int keepalive = 1;

/**
 * blr_start_master - controls the connection of the binlog router to the
 * master MySQL server and triggers the slave registration process for
 * the router.
 *
 * @param   router      The router instance
 */
void
blr_start_master(void* data)
{
    ROUTER_INSTANCE *router = (ROUTER_INSTANCE*)data;
    DCB *client;

    router->stats.n_binlogs_ses = 0;
    spinlock_acquire(&router->lock);
    if (router->master_state != BLRM_UNCONNECTED)
    {
        if (router->master_state != BLRM_SLAVE_STOPPED)
        {
            MXS_ERROR("%s: Master Connect: Unexpected master state %s\n",
                      router->service->name, blrm_states[router->master_state]);
        }
        else
        {
            MXS_NOTICE("%s: Master Connect: binlog state is %s\n",
                       router->service->name, blrm_states[router->master_state]);
        }
        spinlock_release(&router->lock);
        return;
    }
    router->master_state = BLRM_CONNECTING;

    /* Discard the queued residual data */
    while (router->residual)
    {
        router->residual = gwbuf_consume(router->residual, GWBUF_LENGTH(router->residual));
    }
    router->residual = NULL;

    spinlock_release(&router->lock);
    if ((client = dcb_alloc(DCB_ROLE_INTERNAL, NULL)) == NULL)
    {
        MXS_ERROR("Binlog router: failed to create DCB for dummy client");
        return;
    }
    router->client = client;
    client->state = DCB_STATE_POLLING;  /* Fake the client is reading */
    client->data = CreateMySQLAuthData(router->user, router->password, "");
    if ((router->session = session_alloc(router->service, client)) == NULL)
    {
        MXS_ERROR("Binlog router: failed to create session for connection to master");
        return;
    }
    client->session = router->session;
    if ((router->master = dcb_connect(router->service->dbref->server, router->session, BLR_PROTOCOL)) == NULL)
    {
        char *name;
        if ((name = malloc(strlen(router->service->name) + strlen(" Master") + 1)) != NULL)
        {
            sprintf(name, "%s Master", router->service->name);
            hktask_oneshot(name, blr_start_master, router,
                           BLR_MASTER_BACKOFF_TIME * router->retry_backoff++);
            free(name);
        }
        if (router->retry_backoff > BLR_MAX_BACKOFF)
        {
            router->retry_backoff = BLR_MAX_BACKOFF;
        }
        MXS_ERROR("Binlog router: failed to connect to master server '%s'",
                  router->service->dbref->server->unique_name);
        return;
    }
    router->master->remote = strdup(router->service->dbref->server->name);

    MXS_NOTICE("%s: attempting to connect to master server %s:%d, binlog %s, pos %lu",
               router->service->name, router->service->dbref->server->name,
               router->service->dbref->server->port, router->binlog_name, router->current_pos);

    router->connect_time = time(0);

    if (setsockopt(router->master->fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive , sizeof(keepalive )))
    {
        perror("setsockopt");
    }

    router->master_state = BLRM_AUTHENTICATED;
    router->master->func.write(router->master, blr_make_query("SELECT UNIX_TIMESTAMP()"));
    router->master_state = BLRM_TIMESTAMP;

    router->stats.n_masterstarts++;
}

/**
 * Reconnect to the master server.
 *
 * IMPORTANT - must be called with router->active_logs set by the
 * thread that set active_logs.
 *
 * @param   router      The router instance
 */
static void
blr_restart_master(ROUTER_INSTANCE *router)
{
    dcb_close(router->client);

    /* Discard the queued residual data */
    while (router->residual)
    {
        router->residual = gwbuf_consume(router->residual, GWBUF_LENGTH(router->residual));
    }
    router->residual = NULL;

    /* Now it is safe to unleash other threads on this router instance */
    spinlock_acquire(&router->lock);
    router->reconnect_pending = 0;
    router->active_logs = 0;
    spinlock_release(&router->lock);
    if (router->master_state < BLRM_BINLOGDUMP)
    {
        char *name;

        router->master_state = BLRM_UNCONNECTED;

        if ((name = malloc(strlen(router->service->name)  + strlen(" Master") + 1)) != NULL)
        {
            sprintf(name, "%s Master", router->service->name);
            hktask_oneshot(name, blr_start_master, router,
                           BLR_MASTER_BACKOFF_TIME * router->retry_backoff++);
            free(name);
        }
        if (router->retry_backoff > BLR_MAX_BACKOFF)
        {
            router->retry_backoff = BLR_MAX_BACKOFF;
        }
    }
    else
    {
        router->master_state = BLRM_UNCONNECTED;
        blr_start_master(router);
    }
}

/**
 * Request a reconnect to the master.
 *
 * If another thread is active processing messages from the master
 * then merely set a flag for that thread to do the restart. If no
 * threads are active then directly call the restart routine to
 * reconnect to the master.
 *
 * @param   router      The router instance
 */
void
blr_master_reconnect(ROUTER_INSTANCE *router)
{
    int do_reconnect = 0;

    if (router->master_state == BLRM_SLAVE_STOPPED)
    {
        return;
    }

    spinlock_acquire(&router->lock);
    if (router->active_logs)
    {
        /* Currently processing a response, set a flag
         * and get the thread that is process a response
         * to deal with the reconnect.
         */
        router->reconnect_pending = 1;
        router->stats.n_delayedreconnects++;
    }
    else
    {
        router->active_logs = 1;
        do_reconnect = 1;
    }
    spinlock_release(&router->lock);
    if (do_reconnect)
    {
        blr_restart_master(router);
        spinlock_acquire(&router->lock);
        router->active_logs = 0;
        spinlock_release(&router->lock);
    }
}

/**
 * Shutdown a connection to the master
 *
 * @param router    The router instance
 */
void
blr_master_close(ROUTER_INSTANCE *router)
{
    dcb_close(router->master);
    router->master_state = BLRM_UNCONNECTED;
    router->master_event_state = BLR_EVENT_DONE;
}

/**
 * Mark this master connection for a delayed reconnect, used during
 * error recovery to cause a reconnect after 60 seconds.
 *
 * @param router    The router instance
 */
void
blr_master_delayed_connect(ROUTER_INSTANCE *router)
{
    char *name;

    if ((name = malloc(strlen(router->service->name) + strlen(" Master Recovery") + 1)) != NULL)
    {
        sprintf(name, "%s Master Recovery", router->service->name);
        hktask_oneshot(name, blr_start_master, router, 60);
        free(name);
    }
}

/**
 * Binlog router master side state machine event handler.
 *
 * Handles an incoming response from the master server to the binlog
 * router.
 *
 * @param router    The router instance
 * @param buf       The incoming packet
 */
void
blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
{
    char query[BLRM_MASTER_REGITRATION_QUERY_LEN + 1];
    char task_name[BLRM_TASK_NAME_LEN + 1] = "";

    atomic_add(&router->handling_threads, 1);
    ss_dassert(router->handling_threads == 1);
    spinlock_acquire(&router->lock);
    router->active_logs = 1;
    spinlock_release(&router->lock);
    if (router->master_state > BLRM_MAXSTATE)
    {
        MXS_ERROR("Invalid master state machine state (%d) for binlog router.",
                  router->master_state);
        gwbuf_consume(buf, gwbuf_length(buf));

        spinlock_acquire(&router->lock);
        if (router->reconnect_pending)
        {
            router->active_logs = 0;
            spinlock_release(&router->lock);
            atomic_add(&router->handling_threads, -1);
            MXS_ERROR("%s: Pending reconnect in state %s.",
                      router->service->name,
                      blrm_states[router->master_state]);
            blr_restart_master(router);
            return;
        }
        router->active_logs = 0;
        spinlock_release(&router->lock);
        atomic_add(&router->handling_threads, -1);
        return;
    }

    if (router->master_state == BLRM_GTIDMODE && MYSQL_RESPONSE_ERR(buf))
    {
        /*
         * If we get an error response to the GTID Mode then we
         * asusme the server does not support GTID modes and
         * continue. The error is saved and replayed to slaves if
         * they also request the GTID mode.
         */
        MXS_ERROR("%s: Master server does not support GTID Mode.",
                  router->service->name);
    }
    else if (router->master_state != BLRM_BINLOGDUMP && MYSQL_RESPONSE_ERR(buf))
    {
        char *msg_err = NULL;
        int msg_len = 0;
        int len = gwbuf_length(buf);
        unsigned long mysql_errno = extract_field(MYSQL_ERROR_CODE(buf), 16);

        msg_len = len - 7 - 6; // +7 is where msg starts, 6 is skipped the status message (#42000)
        msg_err = (char *)malloc(msg_len + 1);

        if (msg_err)
        {
            // skip status message only as MYSQL_RESPONSE_ERR(buf) points to GWBUF_DATA(buf) +7
            strncpy(msg_err, (char *)(MYSQL_ERROR_MSG(buf) + 6), msg_len);

            /* NULL terminated error string */
            *(msg_err + msg_len) = '\0';
        }

        MXS_ERROR("%s: Received error: %lu, '%s' from master during '%s' phase "
                  "of the master state machine.",
                  router->service->name,
                  mysql_errno,
                  msg_err ? msg_err : "(memory failure)",
                  blrm_states[router->master_state]);
        gwbuf_consume(buf, gwbuf_length(buf));

        spinlock_acquire(&router->lock);

        /* set mysql errno */
        router->m_errno = mysql_errno;

        /* set mysql error message */
        if (router->m_errmsg)
        {
            free(router->m_errmsg);
        }
        router->m_errmsg = msg_err ? msg_err : "(memory failure)";

        router->active_logs = 0;
        if (router->reconnect_pending)
        {
            spinlock_release(&router->lock);
            atomic_add(&router->handling_threads, -1);
            blr_restart_master(router);
            return;
        }
        spinlock_release(&router->lock);
        atomic_add(&router->handling_threads, -1);
        return;
    }
    switch (router->master_state)
    {
    case BLRM_TIMESTAMP:
        // Response to a timestamp message, no need to save this.
        gwbuf_consume(buf, GWBUF_LENGTH(buf));
        buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_ID'");
        router->master_state = BLRM_SERVERID;
        router->master->func.write(router->master, buf);
        router->retry_backoff = 1;
        break;
    case BLRM_SERVERID:
        {
            char *val = blr_extract_column(buf, 2);

            // Response to fetch of master's server-id
            if (router->saved_master.server_id)
            {
                GWBUF_CONSUME_ALL(router->saved_master.server_id);
            }
            router->saved_master.server_id = buf;
            blr_cache_response(router, "serverid", buf);

            // set router->masterid from master server-id if it's not set by the config option
            if (router->masterid == 0)
            {
                router->masterid = atoi(val);
            }

            {
                char str[BLRM_SET_HEARTBEAT_QUERY_LEN];
                sprintf(str, "SET @master_heartbeat_period = %lu000000000", router->heartbeat);
                buf = blr_make_query(str);
            }
            router->master_state = BLRM_HBPERIOD;
            router->master->func.write(router->master, buf);
            free(val);
            break;
        }
    case BLRM_HBPERIOD:
        // Response to set the heartbeat period
        if (router->saved_master.heartbeat)
        {
            GWBUF_CONSUME_ALL(router->saved_master.heartbeat);
        }
        router->saved_master.heartbeat = buf;
        blr_cache_response(router, "heartbeat", buf);
        buf = blr_make_query("SET @master_binlog_checksum = @@global.binlog_checksum");
        router->master_state = BLRM_CHKSUM1;
        router->master->func.write(router->master, buf);
        break;
    case BLRM_CHKSUM1:
        // Response to set the master binlog checksum
        if (router->saved_master.chksum1)
        {
            GWBUF_CONSUME_ALL(router->saved_master.chksum1);
        }
        router->saved_master.chksum1 = buf;
        blr_cache_response(router, "chksum1", buf);
        buf = blr_make_query("SELECT @master_binlog_checksum");
        router->master_state = BLRM_CHKSUM2;
        router->master->func.write(router->master, buf);
        break;
    case BLRM_CHKSUM2:
        {
            char *val = blr_extract_column(buf, 1);

            if (val && strncasecmp(val, "NONE", 4) == 0)
            {
                router->master_chksum = false;
            }
            if (val)
            {
                free(val);
            }
            // Response to the master_binlog_checksum, should be stored
            if (router->saved_master.chksum2)
            {
                GWBUF_CONSUME_ALL(router->saved_master.chksum2);
            }
            router->saved_master.chksum2 = buf;
            blr_cache_response(router, "chksum2", buf);

            if (router->mariadb10_compat)
            {
                buf = blr_make_query("SET @mariadb_slave_capability=4");
                router->master_state = BLRM_MARIADB10;
            }
            else
            {
                buf = blr_make_query("SELECT @@GLOBAL.GTID_MODE");
                router->master_state = BLRM_GTIDMODE;
            }
            router->master->func.write(router->master, buf);
            break;
        }
    case BLRM_MARIADB10:
        // Response to the SET @mariadb_slave_capability=4, should be stored
        if (router->saved_master.mariadb10)
        {
            GWBUF_CONSUME_ALL(router->saved_master.mariadb10);
        }
        router->saved_master.mariadb10 = buf;
        blr_cache_response(router, "mariadb10", buf);
        buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_UUID'");
        router->master_state = BLRM_MUUID;
        router->master->func.write(router->master, buf);
        break;
    case BLRM_GTIDMODE:
        // Response to the GTID_MODE, should be stored
        if (router->saved_master.gtid_mode)
        {
            GWBUF_CONSUME_ALL(router->saved_master.gtid_mode);
        }
        router->saved_master.gtid_mode = buf;
        blr_cache_response(router, "gtidmode", buf);
        buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_UUID'");
        router->master_state = BLRM_MUUID;
        router->master->func.write(router->master, buf);
        break;
    case BLRM_MUUID:
        {
            char *key;
            char *val = NULL;

            key = blr_extract_column(buf, 1);
            if (key && strlen(key))
            {
                val = blr_extract_column(buf, 2);
            }
            if (key)
            {
                free(key);
            }

            /* set the master_uuid from master if not set by the option */
            if (router->set_master_uuid == NULL)
            {
                free(router->master_uuid);
                router->master_uuid = val;
            }
            else
            {
                router->master_uuid = router->set_master_uuid;
            }

            // Response to the SERVER_UUID, should be stored
            if (router->saved_master.uuid)
            {
                GWBUF_CONSUME_ALL(router->saved_master.uuid);
            }
            router->saved_master.uuid = buf;
            blr_cache_response(router, "uuid", buf);
            sprintf(query, "SET @slave_uuid='%s'", router->uuid);
            buf = blr_make_query(query);
            router->master_state = BLRM_SUUID;
            router->master->func.write(router->master, buf);
            break;
        }
    case BLRM_SUUID:
        // Response to the SET @server_uuid, should be stored
        if (router->saved_master.setslaveuuid)
        {
            GWBUF_CONSUME_ALL(router->saved_master.setslaveuuid);
        }
        router->saved_master.setslaveuuid = buf;
        blr_cache_response(router, "ssuuid", buf);
        buf = blr_make_query("SET NAMES latin1");
        router->master_state = BLRM_LATIN1;
        router->master->func.write(router->master, buf);
        break;
    case BLRM_LATIN1:
        // Response to the SET NAMES latin1, should be stored
        if (router->saved_master.setnames)
        {
            GWBUF_CONSUME_ALL(router->saved_master.setnames);
        }
        router->saved_master.setnames = buf;
        blr_cache_response(router, "setnames", buf);
        buf = blr_make_query("SET NAMES utf8");
        router->master_state = BLRM_UTF8;
        router->master->func.write(router->master, buf);
        break;
    case BLRM_UTF8:
        // Response to the SET NAMES utf8, should be stored
        if (router->saved_master.utf8)
        {
            GWBUF_CONSUME_ALL(router->saved_master.utf8);
        }
        router->saved_master.utf8 = buf;
        blr_cache_response(router, "utf8", buf);
        buf = blr_make_query("SELECT 1");
        router->master_state = BLRM_SELECT1;
        router->master->func.write(router->master, buf);
        break;
    case BLRM_SELECT1:
        // Response to the SELECT 1, should be stored
        if (router->saved_master.select1)
        {
            GWBUF_CONSUME_ALL(router->saved_master.select1);
        }
        router->saved_master.select1 = buf;
        blr_cache_response(router, "select1", buf);
        buf = blr_make_query("SELECT VERSION()");
        router->master_state = BLRM_SELECTVER;
        router->master->func.write(router->master, buf);
        break;
    case BLRM_SELECTVER:
        // Response to SELECT VERSION should be stored
        if (router->saved_master.selectver)
        {
            GWBUF_CONSUME_ALL(router->saved_master.selectver);
        }
        router->saved_master.selectver = buf;
        blr_cache_response(router, "selectver", buf);
        buf = blr_make_query("SELECT @@version_comment limit 1");
        router->master_state = BLRM_SELECTVERCOM;
        router->master->func.write(router->master, buf);
        break;
    case BLRM_SELECTVERCOM:
        // Response to SELECT @@version_comment should be stored
        if (router->saved_master.selectvercom)
        {
            GWBUF_CONSUME_ALL(router->saved_master.selectvercom);
        }
        router->saved_master.selectvercom = buf;
        blr_cache_response(router, "selectvercom", buf);
        buf = blr_make_query("SELECT @@hostname");
        router->master_state = BLRM_SELECTHOSTNAME;
        router->master->func.write(router->master, buf);
        break;
    case BLRM_SELECTHOSTNAME:
        // Response to SELECT @@hostname should be stored
        if (router->saved_master.selecthostname)
        {
            GWBUF_CONSUME_ALL(router->saved_master.selecthostname);
        }
        router->saved_master.selecthostname = buf;
        blr_cache_response(router, "selecthostname", buf);
        buf = blr_make_query("SELECT @@max_allowed_packet");
        router->master_state = BLRM_MAP;
        router->master->func.write(router->master, buf);
        break;
    case BLRM_MAP:
        // Response to SELECT @@max_allowed_packet should be stored
        if (router->saved_master.map)
        {
            GWBUF_CONSUME_ALL(router->saved_master.map);
        }
        router->saved_master.map = buf;
        blr_cache_response(router, "map", buf);
        buf = blr_make_registration(router);
        router->master_state = BLRM_REGISTER;
        router->master->func.write(router->master, buf);
        break;
    case BLRM_REGISTER:
        // Request a dump of the binlog file
        buf = blr_make_binlog_dump(router);
        router->master_state = BLRM_BINLOGDUMP;
        router->master->func.write(router->master, buf);
        MXS_NOTICE("%s: Request binlog records from %s at "
                   "position %lu from master server %s:%d",
                   router->service->name, router->binlog_name,
                   router->current_pos,
                   router->service->dbref->server->name,
                   router->service->dbref->server->port);

        /* Log binlog router identity */
        blr_log_identity(router);

        break;
    case BLRM_BINLOGDUMP:
        /**
         * Main body, we have received a binlog record from the master
         */
        blr_handle_binlog_record(router, buf);

        /**
         * Set heartbeat check task
         */
        snprintf(task_name, BLRM_TASK_NAME_LEN, "%s heartbeat", router->service->name);
        hktask_add(task_name, blr_check_last_master_event, router, router->heartbeat);

        break;
    }

    if (router->reconnect_pending)
    {
        blr_restart_master(router);
    }
    spinlock_acquire(&router->lock);
    router->active_logs = 0;
    spinlock_release(&router->lock);
    atomic_add(&router->handling_threads, -1);
}

/**
 * Build a MySQL query into a GWBUF that we can send to the master database
 *
 * @param   query       The text of the query to send
 */
static GWBUF *
blr_make_query(char *query)
{
    GWBUF *buf;
    unsigned char *data;
    int len;

    if ((buf = gwbuf_alloc(strlen(query) + 5)) == NULL)
    {
        return NULL;
    }
    data = GWBUF_DATA(buf);
    len = strlen(query) + 1;
    encode_value(&data[0], len, 24);    // Payload length
    data[3] = 0;                // Sequence id
    // Payload
    data[4] = COM_QUERY;            // Command
    memcpy(&data[5], query, strlen(query));

    return buf;
}

/**
 * Build a MySQL slave registration into a GWBUF that we can send to the
 * master database
 *
 * @param   router  The router instance
 * @return  A MySQL Replication registration message in a GWBUF structure
 */
static GWBUF *
blr_make_registration(ROUTER_INSTANCE *router)
{
    GWBUF *buf;
    unsigned char *data;
    int len = 18;
    int port = 3306;

    if ((buf = gwbuf_alloc(len + 4)) == NULL)
    {
        return NULL;
    }
    data = GWBUF_DATA(buf);
    encode_value(&data[0], len, 24); // Payload length
    data[3] = 0;                     // Sequence ID
    data[4] = COM_REGISTER_SLAVE;    // Command
    encode_value(&data[5], router->serverid, 32);   // Slave Server ID
    data[9] = 0;                     // Slave hostname length
    data[10] = 0;                    // Slave username length
    data[11] = 0;                    // Slave password length
    if (router->service->ports)
    {
        port = router->service->ports->port;
    }
    encode_value(&data[12], port, 16);  // Slave master port
    encode_value(&data[14], 0, 32);         // Replication rank
    encode_value(&data[18], router->masterid, 32);  // Master server-id

    return buf;
}


/**
 * Build a Binlog dump command into a GWBUF that we can send to the
 * master database
 *
 * @param   router  The router instance
 * @return  A MySQL Replication COM_BINLOG_DUMP message in a GWBUF structure
 */
static GWBUF *
blr_make_binlog_dump(ROUTER_INSTANCE *router)
{
    GWBUF *buf;
    unsigned char *data;
    int binlog_file_len = strlen(router->binlog_name);
    /* COM_BINLOG_DUMP needs 11 bytes + binlogname (terminating NULL is not required) */
    int len = 11 + binlog_file_len;

    if ((buf = gwbuf_alloc(len + 4)) == NULL)
    {
        return NULL;
    }
    data = GWBUF_DATA(buf);

    encode_value(&data[0], len, 24);       // Payload length
    data[3] = 0;                           // Sequence ID
    data[4] = COM_BINLOG_DUMP;             // Command
    encode_value(&data[5],
                 router->current_pos, 32); // binlog position
    encode_value(&data[9], 0, 16);         // Flags
    encode_value(&data[11],
                 router->serverid, 32);    // Server-id of MaxScale
    memcpy((char *)&data[15], router->binlog_name,
           binlog_file_len);               // binlog filename
    return buf;
}


/**
 * Encode a value into a number of bits in a MySQL packet
 *
 * @param   data    Point to location in target packet
 * @param   value   The value to pack
 * @param   len Number of bits to encode value into
 */
void
encode_value(unsigned char *data, unsigned int value, int len)
{
    while (len > 0)
    {
        *data++ = value & 0xff;
        value >>= 8;
        len -= 8;
    }
}

/**
 * blr_handle_binlog_record - we have received binlog records from
 * the master and we must now work out what to do with them.
 *
 * @param router    The router instance
 * @param pkt       The binlog records
 */
void
blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
{
    uint8_t *msg = NULL, *ptr, *pdata;
    REP_HEADER hdr;
    unsigned int len = 0, reslen;
    unsigned int pkt_length;
    int no_residual = 1;
    int preslen = -1;
    int prev_length = -1;
    int n_bufs = -1, pn_bufs = -1;

    /*
     * Prepend any residual buffer to the buffer chain we have
     * been called with.
     */
    if (router->residual)
    {
        pkt = gwbuf_append(router->residual, pkt);
        router->residual = NULL;
        no_residual = 0;
    }

    pkt_length = gwbuf_length(pkt);
    /*
     * Loop over all the packets while we still have some data
     * and the packet length is enough to hold a replication event
     * header.
     */
    while (pkt && pkt_length > 24)
    {
        reslen = GWBUF_LENGTH(pkt);
        pdata = GWBUF_DATA(pkt);
        if (reslen < 3) // Payload length straddles buffers
        {
            /* Get the length of the packet from the residual and new packet */
            if (reslen >= 3)
            {
                len = EXTRACT24(pdata);
            }
            else if (reslen == 2)
            {
                len = EXTRACT16(pdata);
                len |= (*(uint8_t *)GWBUF_DATA(pkt->next) << 16);
            }
            else if (reslen == 1)
            {
                len = *pdata;
                len |= (EXTRACT16(GWBUF_DATA(pkt->next)) << 8);
            }
            len += 4;   // Allow space for the header
        }
        else
        {
            len = EXTRACT24(pdata) + 4;
        }
        /* len is now the payload length for the packet we are working on */

        if (reslen < len && pkt_length >= len)
        {
            /*
             * The message is contained in more than the current
             * buffer, however we have the complete messasge in
             * this buffer and the chain of remaining buffers.
             *
             * Allocate a contiguous buffer for the binlog message
             * and copy the complete message into this buffer.
             */
            int msg_remainder = len;
            GWBUF *p = pkt;

            if ((msg = malloc(len)) == NULL)
            {
                MXS_ERROR("Insufficient memory to buffer event "
                          "of %d bytes. Binlog %s @ %lu.",
                          len, router->binlog_name,
                          router->current_pos);
                break;
            }

            n_bufs = 0;
            ptr = msg;
            while (p && msg_remainder > 0)
            {
                int plen = GWBUF_LENGTH(p);
                int n = (msg_remainder > plen ? plen : msg_remainder);
                memcpy(ptr, GWBUF_DATA(p), n);
                msg_remainder -= n;
                ptr += n;
                if (msg_remainder > 0)
                {
                    p = p->next;
                }
                n_bufs++;
            }
            if (msg_remainder)
            {
                MXS_ERROR("Expected entire message in buffer "
                          "chain, but failed to create complete "
                          "message as expected. %s @ %lu",
                          router->binlog_name,
                          router->current_pos);
                free(msg);
                /* msg = NULL; Not needed unless msg will be referred to again */
                break;
            }

            ptr = msg;
        }
        else if (reslen < len)
        {
            /*
             * The message is not fully contained in the current
             * and we do not have the complete message in the
             * buffer chain. Therefore we must stop processing
             * until we receive the next buffer.
             */
            router->stats.n_residuals++;
            MXS_DEBUG("Residual data left after %lu records. %s @ %lu",
                      router->stats.n_binlogs,
                      router->binlog_name, router->current_pos);
            break;
        }
        else
        {
            /*
             * The message is fully contained in the current buffer
             */
            ptr = pdata;
            n_bufs = 1;
        }

        /*
         * ptr now points at the current message in a contiguous buffer,
         * this buffer is either within the GWBUF or in a malloc'd
         * copy if the message straddles GWBUF's.
         */

        if (len < BINLOG_EVENT_HDR_LEN && router->master_event_state != BLR_EVENT_ONGOING)
        {
            char *event_msg = "";

            /* Packet is too small to be a binlog event */
            if (ptr[4] == 0xfe) /* EOF Packet */
            {
                event_msg = "end of file";
            }
            else if (ptr[4] == 0xff)    /* EOF Packet */
            {
                event_msg = "error";
            }
            MXS_NOTICE("Non-event message (%s) from master.", event_msg);
        }
        else
        {
            if (router->master_event_state == BLR_EVENT_DONE)
            {
                spinlock_acquire(&router->lock);
                router->stats.n_binlogs++;
                router->stats.n_binlogs_ses++;
                spinlock_release(&router->lock);

                blr_extract_header(ptr, &hdr);

                /* Sanity check */
                if (hdr.ok == 0)
                {
                    if (hdr.event_size != len - 5 && (hdr.event_size + 1) < MYSQL_PACKET_LENGTH_MAX)
                    {
                        MXS_ERROR("Packet length is %d, but event size is %d, "
                                  "binlog file %s position %lu "
                                  "reslen is %d and preslen is %d, "
                                  "length of previous event %d. %s",
                                  len, hdr.event_size,
                                  router->binlog_name,
                                  router->current_pos,
                                  reslen, preslen, prev_length,
                                  (prev_length == -1 ?
                                   (no_residual ? "No residual data from previous call" :
                                    "Residual data from previous call") : ""));

                        blr_log_packet(LOG_ERR, "Packet:", ptr, len);

                        MXS_ERROR("This event (0x%x) was contained in %d GWBUFs, "
                                  "the previous events was contained in %d GWBUFs",
                                  router->lastEventReceived, n_bufs, pn_bufs);

                        if (msg)
                        {
                            free(msg);
                            /* msg = NULL; Not needed unless msg will be referred to again */
                        }

                        break;
                    }
                    else if ((hdr.event_size + 1) >= MYSQL_PACKET_LENGTH_MAX)
                    {
                        router->master_event_state = BLR_EVENT_STARTED;

                        /** Store the header for later use */
                        memcpy(&router->stored_header, &hdr, sizeof(hdr));
                    }

                    /** Prepare the checksum variables for this event */
                    router->stored_checksum = crc32(0L, NULL, 0);
                    router->checksum_size = hdr.event_size - MYSQL_CHECKSUM_LEN;
                    router->partial_checksum_bytes = 0;
                }
                else
                {
                    /* Terminate replication and exit from main loop */
                    blr_terminate_master_replication(router, ptr, len);

                    gwbuf_free(pkt);
                    pkt = NULL;
                    pkt_length = 0;

                    break;
                }

                if (hdr.ok == 0)
                {
                    spinlock_acquire(&router->lock);

                    /* set mysql errno to 0 */
                    router->m_errno = 0;

                    /* Remove error message */
                    if (router->m_errmsg)
                    {
                        free(router->m_errmsg);
                    }
                    router->m_errmsg = NULL;

                    spinlock_release(&router->lock);
#ifdef SHOW_EVENTS
                    printf("blr: len %lu, event type 0x%02x, flags 0x%04x, "
                           "event size %d, event timestamp %lu\n",
                           (unsigned long)len - 4,
                           hdr.event_type,
                           hdr.flags,
                           hdr.event_size,
                           (unsigned long)hdr.timestamp);
#endif
                }
            }

            /* pending large event */
            if (router->master_event_state != BLR_EVENT_DONE)
            {
                if (len - MYSQL_HEADER_LEN < MYSQL_PACKET_LENGTH_MAX)
                {
                    /** This is the last packet, we can now proceed to distribute
                     * the event afer it has been written to disk */
                    ss_dassert(router->master_event_state != BLR_EVENT_COMPLETE);
                    router->master_event_state = BLR_EVENT_COMPLETE;
                    memcpy(&hdr, &router->stored_header, sizeof(hdr));
                }
                else
                {
                    /* current partial event is being written to disk file */
                    uint32_t offset = MYSQL_HEADER_LEN;
                    uint32_t extra_bytes = MYSQL_HEADER_LEN;

                    /** Don't write the OK byte into the binlog */
                    if (router->master_event_state == BLR_EVENT_STARTED)
                    {
                        offset = MYSQL_HEADER_LEN + 1;
                        router->master_event_state = BLR_EVENT_ONGOING;
                        extra_bytes = MYSQL_HEADER_LEN + 1;
                    }

                    if (router->master_chksum)
                    {
                        uint32_t size = (len - extra_bytes) < router->checksum_size ?
                            len - extra_bytes : router->checksum_size;
                        router->stored_checksum = crc32(router->stored_checksum,
                                                        ptr + offset,
                                                        size);
                        router->checksum_size -= size;

                        if (router->checksum_size == 0 && size < len - offset)
                        {
                            extract_checksum(router, ptr + offset + size, len - offset - size);
                        }
                    }

                    if (blr_write_data_into_binlog(router, len - offset, ptr + offset) == 0)
                    {
                        /** Failed to write to the binlog file, destroy the buffer
                         * chain and close the connection with the master */
                        while (pkt)
                        {
                            pkt = GWBUF_CONSUME_ALL(pkt);
                        }
                        blr_master_close(router);
                        blr_master_delayed_connect(router);
                        return;
                    }
                    pkt = gwbuf_consume(pkt, len);
                    pkt_length -= len;
                    continue;
                }
            }

            /*
             * First check that the checksum we calculate matches the
             * checksum in the packet we received.
             */
            if (router->master_chksum)
            {
                uint32_t pktsum, offset = MYSQL_HEADER_LEN;
                uint32_t size = len - MYSQL_HEADER_LEN - MYSQL_CHECKSUM_LEN;

                if (router->master_event_state == BLR_EVENT_DONE)
                {
                    /** Set the pointer offset to the first byte after
                     * the header and OK byte */
                    offset = MYSQL_HEADER_LEN + 1;
                    size = len - MYSQL_HEADER_LEN - MYSQL_CHECKSUM_LEN - 1;
                }

                size = MIN(size, router->checksum_size);

                if (router->checksum_size > 0)
                {
                    router->stored_checksum = crc32(router->stored_checksum,
                                                    ptr + offset,
                                                    size);
                    router->checksum_size -= size;
                }

                if (router->checksum_size == 0 && size < len - offset)
                {
                    extract_checksum(router, ptr + offset + size, len - offset - size);
                }

                if (router->partial_checksum_bytes == MYSQL_CHECKSUM_LEN)
                {
                    pktsum = EXTRACT32(router->partial_checksum);
                    if (pktsum != router->stored_checksum)
                    {
                        router->stats.n_badcrc++;
                        free(msg);
                        /* msg = NULL; Not needed unless msg will be referred to again */
                        MXS_ERROR("%s: Checksum error in event from master, "
                                  "binlog %s @ %lu. Closing master connection.",
                                  router->service->name, router->binlog_name,
                                  router->current_pos);
                        blr_master_close(router);
                        blr_master_delayed_connect(router);
                        return;
                    }
                }
                else
                {
                    pkt = gwbuf_consume(pkt, len);
                    pkt_length -= len;
                    continue;
                }
            }

            if (hdr.ok == 0)
            {
                router->lastEventReceived = hdr.event_type;
                router->lastEventTimestamp = hdr.timestamp;

                /**
                 * Check for an open transaction, if the option is set
                 * Only complete transactions should be sent to sleves
                 *
                 * If a trasaction is pending router->binlog_position
                 * won't be updated to router->current_pos
                 */

                spinlock_acquire(&router->binlog_lock);
                if (router->trx_safe == 0 || (router->trx_safe && router->pending_transaction == 0))
                {
                    /* no pending transaction: set current_pos to binlog_position */
                    router->binlog_position = router->current_pos;
                    router->current_safe_event = router->current_pos;
                }
                spinlock_release(&router->binlog_lock);

                /**
                 * Detect transactions in events
                 * Only complete transactions should be sent to sleves
                */

                /* If MariaDB 10 compatibility:
                * check for MARIADB10_GTID_EVENT with flags = 0
                  * This marks the transaction starts instead of
                  * QUERY_EVENT with "BEGIN"
                 */
                if (router->trx_safe && router->master_event_state == BLR_EVENT_DONE)
                {
                    if (router->mariadb10_compat)
                    {
                        if (hdr.event_type == MARIADB10_GTID_EVENT)
                        {
                            uint64_t n_sequence;
                            uint32_t domainid;
                            unsigned int flags;
                            n_sequence = extract_field(ptr + 4 + 20, 64);
                            domainid = extract_field(ptr + 4 + 20 + 8, 32);
                            flags = *(ptr + 4 + 20 + 8 + 4);

                            if ((flags & (MARIADB_FL_DDL | MARIADB_FL_STANDALONE)) == 0)
                            {
                                spinlock_acquire(&router->binlog_lock);

                                if (router->pending_transaction > 0)
                                {
                                    MXS_ERROR("A MariaDB 10 transaction "
                                              "is already open "
                                              "@ %lu (GTID %u-%u-%lu) and "
                                              "a new one starts @ %lu",
                                              router->binlog_position,
                                              domainid, hdr.serverid,
                                              n_sequence,
                                              router->current_pos);

                                    // An action should be taken here
                                }

                                router->pending_transaction = 1;

                                spinlock_release(&router->binlog_lock);
                            }
                        }
                    }

                    /**
                     * look for QUERY_EVENT [BEGIN / COMMIT] and XID_EVENT
                     */

                    if (hdr.event_type == QUERY_EVENT)
                    {
                        char *statement_sql;
                        int db_name_len, var_block_len, statement_len;
                        db_name_len = ptr[4 + 20 + 4 + 4];
                        var_block_len = ptr[4 + 20 + 4 + 4 + 1 + 2];

                        statement_len = len - (4 + 20 + 4 + 4 + 1 + 2 + 2 + var_block_len + 1 + db_name_len);
                        statement_sql = calloc(1, statement_len + 1);
                        strncpy(statement_sql,
                                (char *)ptr + 4 + 20 + 4 + 4 + 1 + 2 + 2 + var_block_len + 1 + db_name_len,
                                statement_len);

                        spinlock_acquire(&router->binlog_lock);

                        /* Check for BEGIN (it comes for START TRANSACTION too) */
                        if (strncmp(statement_sql, "BEGIN", 5) == 0)
                        {
                            if (router->pending_transaction > 0)
                            {
                                MXS_ERROR("A transaction is already open "
                                          "@ %lu and a new one starts @ %lu",
                                          router->binlog_position,
                                          router->current_pos);

                                // An action should be taken here
                            }

                            router->pending_transaction = 1;
                        }

                        /* Check for COMMIT in non transactional store engines */
                        if (strncmp(statement_sql, "COMMIT", 6) == 0)
                        {
                            router->pending_transaction = 2;
                        }

                        spinlock_release(&router->binlog_lock);

                        free(statement_sql);
                    }

                    /* Check for COMMIT in Transactional engines, i.e InnoDB */
                    if (hdr.event_type == XID_EVENT)
                    {
                        spinlock_acquire(&router->binlog_lock);

                        if (router->pending_transaction)
                        {
                            router->pending_transaction = 3;
                        }
                        spinlock_release(&router->binlog_lock);
                    }
                }

                /** Gather statistics about the replication event types */
                int event_limit = router->mariadb10_compat ?
                                  MAX_EVENT_TYPE_MARIADB10 : MAX_EVENT_TYPE;

                if (hdr.event_type <= event_limit)
                {
                    router->stats.events[hdr.event_type]++;
                }

                if (hdr.event_type == FORMAT_DESCRIPTION_EVENT && hdr.next_pos == 0)
                {
                    // Fake format description message
                    MXS_DEBUG("Replication fake event. "
                              "Binlog %s @ %lu.",
                              router->binlog_name,
                              router->current_pos);
                    router->stats.n_fakeevents++;

                    if (hdr.event_type == FORMAT_DESCRIPTION_EVENT)
                    {
                        uint8_t *new_fde;
                        unsigned int new_fde_len;
                        /*
                         * We need to save this to replay to new
                         * slaves that attach later.
                         */
                        new_fde_len = hdr.event_size;
                        new_fde = malloc(hdr.event_size);
                        if (new_fde)
                        {
                            memcpy(new_fde, ptr + 5, hdr.event_size);
                            if (router->saved_master.fde_event)
                            {
                                free(router->saved_master.fde_event);
                            }
                            router->saved_master.fde_event = new_fde;
                            router->saved_master.fde_len = new_fde_len;
                        }
                        else
                        {
                            MXS_ERROR("%s: Received a format description "
                                      "event that MaxScale was unable to "
                                      "record. Event length is %d.",
                                      router->service->name,
                                      hdr.event_size);
                            blr_log_packet(LOG_ERR,
                                           "Format Description Event:", ptr, len);
                        }
                    }
                }
                else
                {
                    if (hdr.event_type == HEARTBEAT_EVENT)
                    {
#ifdef SHOW_EVENTS
                        printf("Replication heartbeat\n");
#endif
                        MXS_DEBUG("Replication heartbeat. "
                                  "Binlog %s @ %lu.",
                                  router->binlog_name,
                                  router->current_pos);

                        router->stats.n_heartbeats++;

                        if (router->pending_transaction)
                        {
                            router->stats.lastReply = time(0);
                        }
                    }
                    else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F)
                    {
                        ptr = ptr + 4; // Skip header
                        uint32_t offset = 4;

                        if (router->master_event_state == BLR_EVENT_STARTED ||
                            router->master_event_state == BLR_EVENT_DONE)
                        {
                            ptr++;
                            offset++;
                        }

                        if (hdr.event_type == ROTATE_EVENT)
                        {
                            spinlock_acquire(&router->binlog_lock);
                            router->rotating = 1;
                            spinlock_release(&router->binlog_lock);
                        }

                        /* Current event is being written to disk file.
                         * It is possible for an empty packet to be sent if an
                         * event is exactly 2^24 bytes long. In this case the
                         * empty packet should be discarded. */

                        if (len > MYSQL_HEADER_LEN &&
                            blr_write_binlog_record(router, &hdr, len - offset, ptr) == 0)
                        {
                            /*
                             * Failed to write to the
                             * binlog file, destroy the
                             * buffer chain and close the
                             * connection with the master
                             */
                            while ((pkt = gwbuf_consume(pkt, GWBUF_LENGTH(pkt))) != NULL)
                            {
                                ;
                            }
                            blr_master_close(router);
                            blr_master_delayed_connect(router);
                            return;
                        }

                        /* Check for rotete event */
                        if (hdr.event_type == ROTATE_EVENT)
                        {
                            if (!blr_rotate_event(router, ptr, &hdr))
                            {
                                /*
                                 * Failed to write to the
                                 * binlog file, destroy the
                                 * buffer chain and close the
                                 * connection with the master
                                 */
                                while ((pkt = gwbuf_consume(pkt, GWBUF_LENGTH(pkt))) != NULL)
                                {
                                    ;
                                }
                                blr_master_close(router);
                                blr_master_delayed_connect(router);
                                return;
                            }
                        }

                        /**
                         * Distributing binlog events to slaves
                         * may depend on pending transaction
                         */

                        spinlock_acquire(&router->binlog_lock);

                        if (router->trx_safe == 0 || (router->trx_safe && router->pending_transaction == 0))
                        {
                            router->binlog_position = router->current_pos;
                            router->current_safe_event = router->last_event_pos;

                            spinlock_release(&router->binlog_lock);

                            if (router->master_event_state == BLR_EVENT_COMPLETE)
                            {
                                /** Read the complete event from the disk */
                                GWBUF *record = blr_read_events_from_pos(router,
                                                                         router->last_event_pos,
                                                                         &hdr, hdr.next_pos);
                                if (record)
                                {
                                    uint8_t *data = GWBUF_DATA(record);
                                    blr_distribute_binlog_record(router, &hdr, data,
                                                                 BLR_THREAD_ROLE_MASTER_LARGE_NOTRX);
                                    gwbuf_free(record);
                                }
                                else
                                {
                                    MXS_ERROR("Failed to read event at position"
                                              "%lu with a size of %u bytes.",
                                              router->last_event_pos, hdr.event_size);
                                }
                            }
                            else
                            {
                                /* Now distribute events */
                                blr_distribute_binlog_record(router, &hdr, ptr,
                                                             BLR_THREAD_ROLE_MASTER_NOTRX);
                            }
                        }
                        else
                        {
                            /**
                             * If transaction is closed:
                             *
                             * 1) read current binlog starting
                             *  from router->binlog_position
                             *
                             * 2) distribute read event
                             *
                             * 3) set router->binlog_position to
                             *    router->current_pos
                             *
                             */

                            if (router->pending_transaction > 1)
                            {
                                unsigned long long pos;
                                unsigned long long end_pos;
                                GWBUF *record;
                                uint8_t *raw_data;
                                REP_HEADER new_hdr;

                                pos = router->binlog_position;
                                end_pos = router->current_pos;

                                spinlock_release(&router->binlog_lock);

                                while ((record = blr_read_events_from_pos(router,
                                                                          pos,
                                                                          &new_hdr,
                                                                          end_pos)) != NULL)
                                {
                                    raw_data = GWBUF_DATA(record);

                                    /* distribute event */
                                    blr_distribute_binlog_record(router, &new_hdr, raw_data,
                                                                 BLR_THREAD_ROLE_MASTER_TRX);

                                    spinlock_acquire(&router->binlog_lock);

                                    /** The current safe position is only updated
                                    * if it points to the event we just distributed. */
                                    if (router->current_safe_event == pos)
                                    {
                                        router->current_safe_event = new_hdr.next_pos;
                                    }
                                    else
                                    {
                                        MXS_ERROR("Current safe event (%lu) does"
                                                  " not point at the event we "
                                                  "just sent (%llu) from binlog file %s. "
                                                  "Last commit at %lu, last write at %lu.",
                                                  router->current_safe_event, pos,
                                                  router->binlog_name, router->last_safe_pos,
                                                  router->last_written);
                                    }

                                    pos = new_hdr.next_pos;

                                    spinlock_release(&router->binlog_lock);

                                    gwbuf_free(record);
                                }

                                /* Check whether binlog records has been read in previous loop */
                                if (pos < router->current_pos)
                                {
                                    char err_message[BINLOG_ERROR_MSG_LEN + 1];

                                    err_message[BINLOG_ERROR_MSG_LEN] = '\0';

                                    /* No event has been sent */
                                    if (pos == router->binlog_position)
                                    {
                                        MXS_ERROR("No events distributed to slaves for a pending "
                                                  "transaction in %s at %lu. "
                                                  "Last event from master at %lu",
                                                  router->binlog_name,
                                                  router->binlog_position,
                                                  router->current_pos);

                                        strncpy(err_message,
                                                "No transaction events sent", BINLOG_ERROR_MSG_LEN);
                                    }
                                    else
                                    {
                                        /* Some events have been sent */
                                        MXS_ERROR("Some events were not distributed to slaves for a "
                                                  "pending transaction in %s at %lu. Last distributed "
                                                  "even at %llu, last event from master at %lu",
                                                  router->binlog_name,
                                                  router->binlog_position,
                                                  pos,
                                                  router->current_pos);

                                        strncpy(err_message,
                                                "Incomplete transaction events sent", BINLOG_ERROR_MSG_LEN);
                                    }

                                    /* distribute error message to registered slaves */
                                    blr_distribute_error_message(router, err_message, "HY000", 1236);
                                }

                                /* update binlog_position and set pending to 0 */
                                spinlock_acquire(&router->binlog_lock);

                                router->binlog_position = router->current_pos;
                                router->pending_transaction = 0;

                                spinlock_release(&router->binlog_lock);
                            }
                            else
                            {
                                spinlock_release(&router->binlog_lock);
                            }
                        }
                    }
                    else
                    {
                        router->stats.n_artificial++;
                        MXS_DEBUG("Artificial event not written "
                                  "to disk or distributed. "
                                  "Type 0x%x, Length %d, Binlog "
                                  "%s @ %lu.",
                                  hdr.event_type,
                                  hdr.event_size,
                                  router->binlog_name,
                                  router->current_pos);
                        ptr += 5;
                        if (hdr.event_type == ROTATE_EVENT)
                        {
                            spinlock_acquire(&router->binlog_lock);
                            router->rotating = 1;
                            spinlock_release(&router->binlog_lock);
                            if (!blr_rotate_event(router, ptr, &hdr))
                            {
                                /*
                                 * Failed to write to the
                                 * binlog file, destroy the
                                 * buffer chain and close the
                                 * connection with the master
                                 */
                                while ((pkt = gwbuf_consume(pkt, GWBUF_LENGTH(pkt))) != NULL)
                                {
                                    ;
                                }
                                blr_master_close(router);
                                blr_master_delayed_connect(router);
                                return;
                            }
                        }
                    }
                }

                /** A large event is now fully received and processed */
                if (router->master_event_state == BLR_EVENT_COMPLETE)
                {
                    router->master_event_state = BLR_EVENT_DONE;
                }
            }
            else
            {
                blr_terminate_master_replication(router, ptr, len);
            }
        }

        if (msg)
        {
            free(msg);
            msg = NULL;
        }
        prev_length = len;
        while (len > 0)
        {
            unsigned int n, plen;
            plen = GWBUF_LENGTH(pkt);
            n = (plen < len ? plen : len);
            pkt = gwbuf_consume(pkt, n);
            len -= n;
            pkt_length -= n;
        }
        preslen = reslen;
        pn_bufs = n_bufs;
    }

    /*
     * Check if we have a residual, part binlog message to deal with.
     * Just simply store the GWBUF for next time
     */
    if (pkt)
    {
        router->residual = pkt;
        ss_dassert(pkt_length != 0);
    }
    else
    {
        ss_dassert(pkt_length == 0);
    }
    blr_file_flush(router);
}

/**
 * Populate a header structure for a replication message from a GWBUF structure.
 *
 * @param pkt   The incoming packet in a GWBUF chain
 * @param hdr   The packet header to populate
 */
void
blr_extract_header(register uint8_t *ptr, register REP_HEADER *hdr)
{

    hdr->payload_len = EXTRACT24(ptr);
    hdr->seqno = ptr[3];
    hdr->ok = ptr[4];
    hdr->timestamp = EXTRACT32(&ptr[5]);
    hdr->event_type = ptr[9];
    hdr->serverid = EXTRACT32(&ptr[10]);
    hdr->event_size = EXTRACT32(&ptr[14]);
    hdr->next_pos = EXTRACT32(&ptr[18]);
    hdr->flags = EXTRACT16(&ptr[22]);
}

/**
 * Process a binlog rotate event.
 *
 * @param router    The instance of the router
 * @param ptr       The packet containing the rotate event
 * @param hdr       The replication message header
 * @return          1 if the file could be rotated, 0 otherwise.
 */
static int
blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *ptr, REP_HEADER *hdr)
{
    int len, slen;
    uint64_t pos;
    char file[BINLOG_FNAMELEN + 1];

    ptr += 19;      // Skip event header
    len = hdr->event_size - 19; // Event size minus header
    pos = extract_field(ptr + 4, 32);
    pos <<= 32;
    pos |= extract_field(ptr, 32);
    slen = len - (8 + 4);       // Allow for position and CRC
    if (router->master_chksum == 0)
    {
        slen += 4;
    }
    if (slen > BINLOG_FNAMELEN)
    {
        slen = BINLOG_FNAMELEN;
    }
    memcpy(file, ptr + 8, slen);
    file[slen] = 0;

#ifdef VERBOSE_ROTATE
    printf("binlog rotate: ");
    while (len--)
    {
        printf("0x%02x ", *ptr++);
    }
    printf("\n");
    printf("New file: %s @ %ld\n", file, pos);
#endif

    strcpy(router->prevbinlog, router->binlog_name);

    int rotated = 1;

    if (strncmp(router->binlog_name, file, slen) != 0)
    {
        router->stats.n_rotates++;
        if (blr_file_rotate(router, file, pos) == 0)
        {
            rotated = 0;
        }
    }
    spinlock_acquire(&router->binlog_lock);
    router->rotating = 0;
    spinlock_release(&router->binlog_lock);
    return rotated;
}

/**
 * Create the auth data needed to be able to call dcb_connect.
 *
 * This doesn't really belong here and should be moved at some stage.
 */
static void *
CreateMySQLAuthData(char *username, char *password, char *database)
{
    MYSQL_session *auth_info;

    if (username == NULL || password == NULL)
    {
        MXS_ERROR("You must specify both username and password for the binlog router.\n");
        return NULL;
    }

    if ((auth_info = calloc(1, sizeof(MYSQL_session))) == NULL)
    {
        return NULL;
    }
    strncpy(auth_info->user, username, MYSQL_USER_MAXLEN);
    strncpy(auth_info->db, database, MYSQL_DATABASE_MAXLEN);
    gw_sha1_str((const uint8_t *)password, strlen(password), auth_info->client_sha1);

    return auth_info;
}

/** Actions that can be taken when an event is being distributed to the slaves*/
typedef enum
{
    SLAVE_SEND_EVENT, /*< Send the event to the slave */
    SLAVE_FORCE_CATCHUP, /*< Force the slave into catchup mode */
    SLAVE_EVENT_ALREADY_SENT /*< The slave already has the event, don't send it */
} slave_event_action_t;

/**
 * Distribute the binlog record we have just received to all the registered slaves.
 *
 * @param   router      The router instance
 * @param   hdr     The replication event header
 * @param   ptr     The raw replication event data
 */
void
blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr,
                             blr_thread_role_t role)
{
    ROUTER_SLAVE *slave;
    int action;
    unsigned int cstate;

    spinlock_acquire(&router->lock);
    slave = router->slaves;
    while (slave)
    {
        if (slave->state != BLRS_DUMPING)
        {
            slave = slave->next;
            continue;
        }
        spinlock_acquire(&slave->catch_lock);
        if ((slave->cstate & (CS_UPTODATE | CS_BUSY)) == CS_UPTODATE)
        {
            /*
             * This slave is reporting it is to date with the binlog of the
             * master running on this slave.
             * It has no thread running currently that is sending binlog
             * events.
             */
            action = 1;
            slave->cstate |= CS_BUSY;
        }
        else if ((slave->cstate & (CS_UPTODATE | CS_BUSY)) == (CS_UPTODATE | CS_BUSY))
        {
            /*
             * The slave is up to date with the binlog and a process is
             * running on this slave to send binlog events.
             */
            slave->overrun = 1;
            action = 2;
        }
        else if ((slave->cstate & CS_UPTODATE) == 0)
        {
            /* Slave is in catchup mode */
            action = 3;
        }
        else
        {
            MXS_ERROR("slave->cstate does not contain a meaningful state %d", slave->cstate);
            action = 0;
        }
        slave->stats.n_actions[action - 1]++;
        spinlock_release(&slave->catch_lock);

        if (action == 1)
        {
            spinlock_acquire(&router->binlog_lock);

            slave_event_action_t slave_action = SLAVE_FORCE_CATCHUP;
            const bool same_file = strcmp(slave->binlogfile, router->binlog_name) == 0;
            const bool rotate = hdr->event_type == ROTATE_EVENT &&
                strcmp(slave->binlogfile, router->prevbinlog) == 0;

            if (router->trx_safe && (same_file || rotate) &&
                slave->binlog_pos == router->current_safe_event)
            {
                /** Slave needs the current event being distributed */
                slave_action = SLAVE_SEND_EVENT;
            }
            else if (!router->trx_safe && (same_file || rotate) &&
                     slave->binlog_pos == router->last_event_pos)
            {
                /** Transaction safety is off */
                slave_action = SLAVE_SEND_EVENT;
            }
            else if (same_file)
            {
                if (slave->binlog_pos == hdr->next_pos)
                {
                    /*
                     * Slave has already read record from file, no
                     * need to distrbute this event
                     */
                    slave_action = SLAVE_EVENT_ALREADY_SENT;
                }
                else if ((slave->binlog_pos > hdr->next_pos - hdr->event_size))
                {
                    /*
                     * The slave is ahead of the master, this should never
                     * happen. Force the slave to catchup mode in order to
                     * try to resolve the issue.
                     */
                    MXS_ERROR("Slave %s:%d server ID %d is ahead of expected position %s@%u. "
                              "Expected position %d", slave->dcb->remote,
                              ntohs((slave->dcb->ipv4).sin_port), slave->serverid,
                              slave->binlogfile, slave->binlog_pos,
                              hdr->next_pos - hdr->event_size);
                }
                /* If none of the above conditions were met, a slave in catchup
                 * mode transitioned into up-to-date mode while we were
                 * distributing events. The slave needs to be forced into
                 * catchup mode since it isn't up to date anymore. */
            }
            else if (rotate)
            {
                /** Slave is more than one binlog file behind */
                MXS_WARNING("Slave %s:%d server ID %d is behind more than one binlog file "
                          "from the master. Slave is using '%s' with position %d "
                          "when master binlog file is '%s'.", slave->dcb->remote,
                          ntohs((slave->dcb->ipv4).sin_port), slave->serverid,
                          slave->binlogfile, slave->binlog_pos, router->binlog_name);
            }
            else
            {
                /** Slave is lagging behind */
                MXS_WARNING("Slave %s:%d server ID %d is using binlog file '%s' with "
                          "position %d. Master binlog file is '%s' at position %lu "
                          "with last safe event at %lu.", slave->dcb->remote,
                          ntohs((slave->dcb->ipv4).sin_port), slave->serverid,
                          slave->binlogfile, slave->binlog_pos, router->binlog_name,
                          router->current_pos, router->current_safe_event);
            }

            spinlock_release(&router->binlog_lock);

            /*
             * If slave_action is SLAVE_FORCE_CATCHUP then
             * the slave is not at the position it should be. Force it into
             * catchup mode rather than send this event.
             */

            switch (slave_action)
            {
                char binlog_name[BINLOG_FNAMELEN + 1];
                uint32_t binlog_pos;

            case SLAVE_SEND_EVENT:
                /*
                 * The slave should be up to date, check that the binlog
                 * position matches the event we have to distribute or
                 * this is a rotate event. Send the event directly from
                 * memory to the slave.
                 */
                slave->lastEventTimestamp = hdr->timestamp;
                slave->lastEventReceived = hdr->event_type;

                /* set lastReply */
                if (router->send_slave_heartbeat)
                {
                    slave->lastReply = time(0);
                }

                strcpy(binlog_name, slave->binlogfile);
                binlog_pos = slave->binlog_pos;

                if (hdr->event_type == ROTATE_EVENT)
                {
                    blr_slave_rotate(router, slave, ptr);
                }

                if (blr_send_event(role, binlog_name, binlog_pos, slave, hdr, ptr))
                {
                    spinlock_acquire(&slave->catch_lock);
                    if (hdr->event_type != ROTATE_EVENT)
                    {
                        slave->binlog_pos = hdr->next_pos;
                    }
                    if (slave->overrun)
                    {
                        slave->stats.n_overrun++;
                        slave->overrun = 0;
                        poll_fake_write_event(slave->dcb);
                    }
                    else
                    {
                        slave->cstate &= ~CS_BUSY;
                    }
                    spinlock_release(&slave->catch_lock);
                }
                else
                {
                    MXS_WARNING("Slave %s:%i, server-id %d, binlog '%s, position %u: "
                                "Master-thread could not send event to slave, closing connection.",
                                slave->dcb->remote,
                                ntohs((slave->dcb->ipv4).sin_port),
                                slave->serverid,
                                binlog_name,
                                binlog_pos);
                    slave->state = BLRS_ERRORED;
                    dcb_close(slave->dcb);
                }
                break;

            case SLAVE_EVENT_ALREADY_SENT:
                spinlock_acquire(&slave->catch_lock);
                slave->cstate &= ~CS_BUSY;
                spinlock_release(&slave->catch_lock);
                break;

            case SLAVE_FORCE_CATCHUP:
                spinlock_acquire(&slave->catch_lock);
                cstate = slave->cstate;
                slave->cstate &= ~(CS_UPTODATE | CS_BUSY);
                slave->cstate |= CS_EXPECTCB;
                spinlock_release(&slave->catch_lock);
                if ((cstate & CS_UPTODATE) == CS_UPTODATE)
                {
#ifdef STATE_CHANGE_LOGGING_ENABLED
                    MXS_NOTICE("%s: Slave %s:%d, server-id %d transition from "
                               "up-to-date to catch-up in blr_distribute_binlog_record, "
                               "binlog file '%s', position %lu.",
                               router->service->name,
                               slave->dcb->remote,
                               ntohs((slave->dcb->ipv4).sin_port),
                               slave->serverid,
                               slave->binlogfile, (unsigned long)slave->binlog_pos);
#endif
                }
                poll_fake_write_event(slave->dcb);
                break;
            }
        }
        else if (action == 3)
        {
            /* Slave is not up to date
             * Check if it is either expecting a callback or
             * is busy processing a callback
             */
            spinlock_acquire(&slave->catch_lock);
            if ((slave->cstate & (CS_EXPECTCB | CS_BUSY)) == 0)
            {
                slave->cstate |= CS_EXPECTCB;
                spinlock_release(&slave->catch_lock);
                poll_fake_write_event(slave->dcb);
            }
            else
            {
                spinlock_release(&slave->catch_lock);
            }
        }

        slave = slave->next;
    }
    spinlock_release(&router->lock);
}

/**
 * Write a raw event (the first 40 bytes at most) to a log file
 *
 * @param priority The syslog priority of the message (LOG_ERR, LOG_WARNING, etc.)
 * @param msg      A textual message to write before the packet
 * @param ptr      Pointer to the message buffer
 * @param len      Length of message packet
 */
static void
blr_log_packet(int priority, char *msg, uint8_t *ptr, int len)
{
    char buf[400] = "";
    char *bufp;
    int i;

    bufp = buf;
    bufp += sprintf(bufp, "%s length = %d: ", msg, len);
    for (i = 0; i < len && i < 40; i++)
    {
        bufp += sprintf(bufp, "0x%02x ", ptr[i]);
    }
    if (i < len)
    {
        MXS_LOG_MESSAGE(priority, "%s...", buf);
    }
    else
    {
        MXS_LOG_MESSAGE(priority, "%s", buf);
    }
}

/**
 * Check if the master connection is in place and we
 * are downlaoding binlogs
 *
 * @param router    The router instance
 * @return non-zero if we are recivign binlog records
 */
int
blr_master_connected(ROUTER_INSTANCE *router)
{
    return router->master_state == BLRM_BINLOGDUMP;
}

/**
 * Extract a result value from the set of messages that make up a
 * MySQL response packet.
 *
 * @param buf   The GWBUF containing the response
 * @param col   The column number to return
 * @return  The result form the column or NULL. The caller must free the result
 */
char *
blr_extract_column(GWBUF *buf, int col)
{
    uint8_t *ptr;
    int len, ncol, collen;
    char    *rval;

    if (buf == NULL)
    {
        return NULL;
    }

    ptr = (uint8_t *)GWBUF_DATA(buf);
    /* First packet should be the column count */
    len = EXTRACT24(ptr);
    ptr += 3;
    if (*ptr != 1)      // Check sequence number is 1
    {
        return NULL;
    }
    ptr++;
    ncol = *ptr++;
    if (ncol < col)     // Not that many column in result
    {
        return NULL;
    }
    // Now ptr points at the column definition
    while (ncol-- > 0)
    {
        len = EXTRACT24(ptr);
        ptr += 4;   // Skip to payload
        ptr += len; // Skip over payload
    }
    // Now we should have an EOF packet
    len = EXTRACT24(ptr);
    ptr += 4;       // Skip to payload
    if (*ptr != 0xfe)
    {
        return NULL;
    }
    ptr += len;

    // Finally we have reached the row
    len = EXTRACT24(ptr);
    ptr += 4;

    /** The first EOF packet signals the start of the resultset rows and the second
     EOF packet signals the end of the result set. If the resultset
     contains a second EOF packet right after the first one, the result set is empty and
     contains no rows. */
    if (len == 5 && *ptr == 0xfe)
    {
        return NULL;
    }

    while (--col > 0)
    {
        collen = *ptr++;
        ptr += collen;
    }
    collen = *ptr++;
    if ((rval = malloc(collen + 1)) == NULL)
    {
        return NULL;
    }
    memcpy(rval, ptr, collen);
    rval[collen] = 0;       // NULL terminate

    return rval;
}

/**
 * Read a replication event form current opened binlog into a GWBUF structure.
 *
 * @param router    The router instance
 * @param pos       Position of binlog record to read
 * @param hdr       Binlog header to populate
 * @return      The binlog record wrapped in a GWBUF structure
 */
GWBUF
*blr_read_events_from_pos(ROUTER_INSTANCE *router,
                          unsigned long long pos,
                          REP_HEADER *hdr,
                          unsigned long long pos_end)
{
    unsigned long long end_pos = 0;
    uint8_t hdbuf[19];
    uint8_t *data;
    GWBUF *result;
    int n;
    int event_limit;

    /* Get current binnlog position */
    end_pos = pos_end;

    /* end of file reached, we're done */
    if (pos == end_pos)
    {
        return NULL;
    }

    /* error */
    if (pos > end_pos)
    {
        MXS_ERROR("Reading saved events, the specified pos %llu "
                  "is ahead of current pos %lu for file %s",
                  pos, router->current_pos, router->binlog_name);
        return NULL;
    }

    /* Read the event header information from the file */
    if ((n = pread(router->binlog_fd, hdbuf, 19, pos)) != 19)
    {
        switch (n)
        {
        case 0:
            MXS_DEBUG("Reading saved events: reached end of binlog file at %llu.", pos);
            break;
        case -1:
            {
                char err_msg[STRERROR_BUFLEN];
                MXS_ERROR("Reading saved events: failed to read binlog "
                          "file %s at position %llu"
                          " (%s).", router->binlog_name,
                          pos, strerror_r(errno, err_msg, sizeof(err_msg)));

                if (errno == EBADF)
                {
                    MXS_ERROR("Reading saved events: bad file descriptor for file %s"
                              ", descriptor %d.",
                              router->binlog_name, router->binlog_fd);
                }
            }
            break;
        default:
            MXS_ERROR("Reading saved events: short read when reading the header. "
                      "Expected 19 bytes but got %d bytes. "
                      "Binlog file is %s, position %llu",
                      n, router->binlog_name, pos);
            break;
        }

        return NULL;
    }

    hdr->timestamp = EXTRACT32(hdbuf);
    hdr->event_type = hdbuf[4];
    hdr->serverid = EXTRACT32(&hdbuf[5]);
    hdr->event_size = extract_field(&hdbuf[9], 32);
    hdr->next_pos = EXTRACT32(&hdbuf[13]);
    hdr->flags = EXTRACT16(&hdbuf[17]);

    event_limit = router->mariadb10_compat ? MAX_EVENT_TYPE_MARIADB10 : MAX_EVENT_TYPE;

    if (hdr->event_type > event_limit)
    {
        MXS_ERROR("Reading saved events: invalid event type 0x%x. "
                  "Binlog file is %s, position %llu",
                  hdr->event_type,
                  router->binlog_name, pos);
        return NULL;
    }

    if ((result = gwbuf_alloc(hdr->event_size)) == NULL)
    {
        MXS_ERROR("Reading saved events: failed to allocate memory for binlog entry, "
                  "size %d at %llu.",
                  hdr->event_size, pos);
        return NULL;
    }

    /* Copy event header*/
    data = GWBUF_DATA(result);
    memcpy(data, hdbuf, 19);

    /* Read event data and put int into buffer after header */
    if ((n = pread(router->binlog_fd, &data[19], hdr->event_size - 19, pos + 19)) != hdr->event_size - 19)
    {
        if (n == -1)
        {
            char err_msg[STRERROR_BUFLEN];
            MXS_ERROR("Reading saved events: the event at %llu in %s. "
                      "%s, expected %d bytes.",
                      pos, router->binlog_name,
                      strerror_r(errno, err_msg, sizeof(err_msg)), hdr->event_size - 19);
        }
        else
        {
            MXS_ERROR("Reading saved events: short read when reading "
                      "the event at %llu in %s. "
                      "Expected %d bytes got %d bytes.",
                      pos, router->binlog_name, hdr->event_size - 19, n);

            if (end_pos - pos < hdr->event_size)
            {
                MXS_ERROR("Reading saved events: binlog event "
                          "is close to the end of the binlog file, "
                          "current file size is %llu.", end_pos);
            }
        }

        /* free buffer */
        gwbuf_free(result);

        return NULL;
    }

    return result;
}

/**
 * Stop and start the master connection
 *
 * @param router    The router instance
 */
void
blr_stop_start_master(ROUTER_INSTANCE *router)
{
    if (router->master)
    {
        if (router->master->fd != -1 && router->master->state == DCB_STATE_POLLING)
        {
            blr_master_close(router);
        }
    }

    spinlock_acquire(&router->lock);

    router->master_state = BLRM_SLAVE_STOPPED;

    /* set last_safe_pos */
    router->last_safe_pos = router->binlog_position;

    /**
     * Set router->prevbinlog to router->binlog_name
     * The FDE event with current filename may arrive after STOP SLAVE is received
     */

    if (strcmp(router->binlog_name, router->prevbinlog) != 0)
    {
        strncpy(router->prevbinlog, router->binlog_name, BINLOG_FNAMELEN);
    }

    if (router->client)
    {
        if (router->client->fd != -1 && router->client->state == DCB_STATE_POLLING)
        {
            dcb_close(router->client);
            router->client = NULL;
        }
    }

    /* Discard the queued residual data */
    while (router->residual)
    {
        router->residual = gwbuf_consume(router->residual, GWBUF_LENGTH(router->residual));
    }
    router->residual = NULL;

    router->master_state = BLRM_UNCONNECTED;
    spinlock_release(&router->lock);

    blr_master_reconnect(router);
}

/**
 * The heartbeat check function called from the housekeeper.
 * We can try a new master connection if current one is seen out of date
 *
 * @param router    Current router instance
 */

static void
blr_check_last_master_event(void *inst)
{
    ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)inst;
    int master_check = 1;
    int master_state =  BLRM_UNCONNECTED;
    char task_name[BLRM_TASK_NAME_LEN + 1] = "";

    spinlock_acquire(&router->lock);

    master_check = blr_check_heartbeat(router);

    master_state = router->master_state;

    spinlock_release(&router->lock);

    if (!master_check)
    {
        /*
         * stop current master connection
         * and try a new connection
         */
        blr_stop_start_master(router);
    }

    if ( (!master_check) || (master_state != BLRM_BINLOGDUMP) )
    {
        /*
         * Remove the task, it will be added again
         * when master state is back to BLRM_BINLOGDUMP
         * by blr_master_response()
         */
        snprintf(task_name, BLRM_TASK_NAME_LEN, "%s heartbeat", router->service->name);

        hktask_remove(task_name);
    }
}

/**
 * Check last heartbeat or last received event against router->heartbeat time interval
 *
 * checked interval is againts (router->heartbeat + BLR_NET_LATENCY_WAIT_TIME)
 * that is currently set to 1
 *
 * @param router    Current router instance
 * @return      0 if master connection must be closed and opened again, 1 otherwise
 */

int
blr_check_heartbeat(ROUTER_INSTANCE *router)
{
    time_t t_now = time(0);
    char *event_desc = NULL;

    if (router->master_state != BLRM_BINLOGDUMP)
    {
        return 1;
    }

    event_desc = blr_last_event_description(router);

    if (router->master_state == BLRM_BINLOGDUMP && router->lastEventReceived > 0)
    {
        if ((t_now - router->stats.lastReply) > (router->heartbeat + BLR_NET_LATENCY_WAIT_TIME))
        {
            MXS_ERROR("No event received from master %s:%d in heartbeat period (%lu seconds), "
                      "last event (%s %d) received %lu seconds ago. Assuming connection is dead "
                      "and reconnecting.",
                      router->service->dbref->server->name,
                      router->service->dbref->server->port,
                      router->heartbeat,
                      event_desc != NULL ? event_desc : "unknown",
                      router->lastEventReceived,
                      t_now - router->stats.lastReply);

            return 0;
        }
    }

    return 1;
}

/**
 * Log binlog router identy after master registration, state is BLRM_BINLOG_DUMP
 *
 * @param   router  The router instance
 */

static void blr_log_identity(ROUTER_INSTANCE *router)
{
    char *master_uuid;
    char *master_hostname;
    char *master_version;

    if (router->set_master_version)
    {
        master_version  = router->set_master_version;
    }
    else
    {
        master_version = blr_extract_column(router->saved_master.selectver, 1);
    }

    if (router->set_master_hostname)
    {
        master_hostname  = router->set_master_hostname;
    }
    else
    {
        master_hostname = blr_extract_column(router->saved_master.selecthostname, 1);
    }

    if (router->set_master_uuid)
    {
        master_uuid = router->master_uuid;
    }
    else
    {
        master_uuid = blr_extract_column(router->saved_master.uuid, 2);
    }

    /* Seen by the master */
    MXS_NOTICE("%s: identity seen by the master: "
               "server_id: %d, uuid: %s",
               router->service->name,
               router->serverid, (router->uuid == NULL ? "not available" : router->uuid));

    /* Seen by the slaves */

    /* MariaDB 5.5 and MariaDB don't have the MASTER_UUID var */
    if (master_uuid == NULL)
    {
        MXS_NOTICE("%s: identity seen by the slaves: "
                   "server_id: %d, hostname: %s, MySQL version: %s",
                   router->service->name,
                   router->masterid, (master_hostname == NULL ? "not available" : master_hostname),
                   (master_version == NULL ? "not available" : master_version));
    }
    else
    {
        MXS_NOTICE("%s: identity seen by the slaves: "
                   "server_id: %d, uuid: %s, hostname: %s, MySQL version: %s",
                   router->service->name,
                   router->masterid, master_uuid,
                   (master_hostname == NULL ? "not available" : master_hostname),
                   (master_version == NULL ? "not available" : master_version));
    }
}

/**
 * Distribute an error message to all the registered slaves.
 *
 * @param   router      The router instance
 * @param   message     The message to send
 * @param   state       The MySQL State for message
 * @param   err_code    The MySQL error code for message
 */
static void
blr_distribute_error_message(ROUTER_INSTANCE *router, char *message, char *state, unsigned int err_code)
{
    ROUTER_SLAVE *slave;

    spinlock_acquire(&router->lock);

    slave = router->slaves;
    while (slave)
    {
        if (slave->state != BLRS_DUMPING)
        {
            slave = slave->next;
            continue;
        }

        /* send the error that stops slave replication */
        blr_send_custom_error(slave->dcb, slave->seqno++, 0, message, state, err_code);

        slave = slave->next;
    }

    spinlock_release(&router->lock);
}

int
blr_write_data_into_binlog(ROUTER_INSTANCE *router, uint32_t data_len, uint8_t *buf)
{
    int n;

    if ((n = pwrite(router->binlog_fd, buf, data_len,
                    router->last_written)) != data_len)
    {
        char err_msg[STRERROR_BUFLEN];
        MXS_ERROR("%s: Failed to write binlog record at %lu of %s, %s. "
                  "Truncating to previous record.",
                  router->service->name, router->last_written,
                  router->binlog_name,
                  strerror_r(errno, err_msg, sizeof(err_msg)));

        /* Remove any partial event that was written */
        if (ftruncate(router->binlog_fd, router->binlog_position))
        {
            MXS_ERROR("%s: Failed to truncate binlog record at %lu of %s, %s. ",
                      router->service->name, router->binlog_position,
                      router->binlog_name,
                      strerror_r(errno, err_msg, sizeof(err_msg)));
        }
        return 0;
    }
    router->last_written += data_len;
    return n;
}

/**
 * Send a replication event packet to a slave
 *
 * The first replication event packet contains one byte set to either
 * 0x0, 0xfe or 0xff which signals what the state of the replication stream is.
 * If the data pointed by @c buf is not the start of the replication header
 * and part of the replication event is already sent, @c first must be set to
 * false so that the first status byte is not sent again.
 *
 * @param slave Slave where the packet is sent to
 * @param buf Buffer containing the data
 * @param len Length of the data
 * @param first If this is the first packet of a multi-packet event
 * @return True on success, false when memory allocation fails
 */
bool blr_send_packet(ROUTER_SLAVE *slave, uint8_t *buf, uint32_t len, bool first)
{
    bool rval = true;
    unsigned int datalen = len + (first ? 1 : 0);
    GWBUF *buffer = gwbuf_alloc(datalen + MYSQL_HEADER_LEN);
    if (buffer)
    {
        uint8_t *data = GWBUF_DATA(buffer);
        encode_value(data, datalen, 24);
        data += 3;
        *data++ = slave->seqno++;

        if (first)
        {
            *data++ = 0; // OK byte
        }

        if (len > 0)
        {
            memcpy(data, buf, len);
        }

        slave->stats.n_bytes += GWBUF_LENGTH(buffer);
        slave->dcb->func.write(slave->dcb, buffer);
    }
    else
    {
        MXS_ERROR("failed to allocate %ld bytes of memory when writing an"
                  " event.", datalen + MYSQL_HEADER_LEN);
        rval = false;
    }
    return rval;
}

/**
 * Send a single replication event to a slave
 *
 * This sends the complete replication event to a slave. If the event size exceeds
 * the maximum size of a MySQL packet, it will be sent in multiple packets.
 *
 * @param role  What is the role of the caller, slave or master.
 * @param binlog_name The name of the binlogfile.
 * @param binlog_pos The position in the binlogfile.
 * @param slave Slave where the event is sent to
 * @param hdr   Replication header
 * @param buf   Pointer to the replication event as it was read from the disk
 * @return True on success, false if memory allocation failed
 */
bool blr_send_event(blr_thread_role_t role,
                    const char* binlog_name,
                    uint32_t binlog_pos,
                    ROUTER_SLAVE *slave,
                    REP_HEADER *hdr,
                    uint8_t *buf)
{
    bool rval = true;

    if ((strcmp(slave->lsi_binlog_name, binlog_name) == 0) &&
        (slave->lsi_binlog_pos == binlog_pos))
    {
        MXS_ERROR("Slave %s:%i, server-id %d, binlog '%s', position %u: "
                  "thread %lu in the role of %s could not send the event, "
                  "the event has already been sent by thread %lu in the role of %s. "
                  "%u bytes buffered for writing in DCB %p. %lu events received from master.",
                  slave->dcb->remote,
                  ntohs((slave->dcb->ipv4).sin_port),
                  slave->serverid,
                  binlog_name,
                  binlog_pos,
                  thread_self(),
                  ROLETOSTR(role),
                  slave->lsi_sender_tid,
                  ROLETOSTR(slave->lsi_sender_role),
                  gwbuf_length(slave->dcb->writeq), slave->dcb,
                  slave->router->stats.n_binlogs);
        return false;
    }

    /** Check if the event and the OK byte fit into a single packet  */
    if (hdr->event_size + 1 < MYSQL_PACKET_LENGTH_MAX)
    {
        rval = blr_send_packet(slave, buf, hdr->event_size, true);
    }
    else
    {
        /** Total size of all the payloads in all the packets */
        int64_t len = hdr->event_size + 1;
        bool first = true;

        while (rval && len > 0)
        {
            uint64_t payload_len = first ? MYSQL_PACKET_LENGTH_MAX - 1 :
                                   MIN(MYSQL_PACKET_LENGTH_MAX, len);

            if (blr_send_packet(slave, buf, payload_len, first))
            {
                /** The check for exactly 0x00ffffff bytes needs to be done
                 * here as well */
                if (len == MYSQL_PACKET_LENGTH_MAX)
                {
                    blr_send_packet(slave, buf, 0, false);
                }

                /** Add the extra byte written by blr_send_packet */
                len -= first ? payload_len + 1 : payload_len;
                buf += payload_len;
                first = false;
            }
            else
            {
                rval = false;
            }
        }
    }

    slave->stats.n_events++;

    if (rval)
    {
        strcpy(slave->lsi_binlog_name, binlog_name);
        slave->lsi_binlog_pos = binlog_pos;
        slave->lsi_sender_role = role;
        slave->lsi_sender_tid = thread_self();
    }
    else
    {
        MXS_ERROR("Failed to send an event of %u bytes to slave at %s:%d.",
                  hdr->event_size, slave->dcb->remote,
                  ntohs(slave->dcb->ipv4.sin_port));
    }
    return rval;
}

/**
 * Extract the checksum from the binlogs
 *
 * This updates the internal state of the router and will allow us to detect
 * if the checksum is split across two packets.
 * @param router Router instance
 * @param cksumptr Pointer to the checksum
 * @param len How much of the data is readable
 */
void extract_checksum(ROUTER_INSTANCE* router, uint8_t *cksumptr, uint8_t len)
{
    uint8_t *ptr = cksumptr;
    while (ptr - cksumptr < len)
    {
        router->partial_checksum[router->partial_checksum_bytes] = *ptr;
        ptr++;
        router->partial_checksum_bytes++;
    }
}

/**
 * Stop the slave connection and log errors
 *
 * @param router Router instance
 * @param ptr Pointer to the start of the packet
 * @param len Length of the packet
 */
static void blr_terminate_master_replication(ROUTER_INSTANCE* router, uint8_t* ptr, int len)
{
    unsigned long mysql_errno = extract_field(ptr + 5, 16);
    int msg_len = len - 7 - 6; // msg len is decreased by 7 and 6
    char *msg_err = (char *)malloc(msg_len + 1);

    strncpy(msg_err, (char *)ptr + 7 + 6, msg_len);
    *(msg_err + msg_len) = '\0';

    spinlock_acquire(&router->lock);

    char* old_errmsg = router->m_errmsg;
    router->m_errmsg = msg_err;
    router->m_errno = mysql_errno;
    router->master_state = BLRM_SLAVE_STOPPED;
    router->stats.n_binlog_errors++;

    spinlock_release(&router->lock);

    free(old_errmsg);
    MXS_ERROR("Error packet in binlog stream.%s @ %lu.",
              router->binlog_name, router->current_pos);

}
